package org.databandtech.flink.source;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Connection;
import java.sql.ResultSet;

public class SourceFromMySQL extends RichSourceFunction<Tuple2<String,Integer>> {

	private static final long serialVersionUID = 3120905365885782365L;
	PreparedStatement preparedStatement;
    Connection connection;
    String[] COLUMNS;
    Tuple item;
    String SQL;
    String URL = "";
    String USERNAME = "";
    String PASSWORD = "";
    
    public SourceFromMySQL(String url, String username,String password,String[] columns,String sql) {
        super();
        this.COLUMNS = columns;
        this.SQL = sql;
        this.URL = url;
        this.USERNAME = username;
        this.PASSWORD = password;
    }

    @Override
    public void open(Configuration parameters) {
        String driver = "com.mysql.jdbc.Driver";        
        try {
			super.open( parameters );
	        Class.forName( driver );
	        connection = DriverManager.getConnection( URL, USERNAME, PASSWORD );
	        String sql = SQL;
	        preparedStatement = connection.prepareStatement( sql );
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }

    @Override
    public void close()  {
        try {
			super.close();
			if (connection != null) {
	            connection.close();
	        }
	        if (preparedStatement != null) {
	            preparedStatement.close();
	        }
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}       
    }

    @Override
    public void run(SourceContext<Tuple2<String,Integer>> sourceContext)  {
        try {
            ResultSet resultSet = preparedStatement.executeQuery();
            
            while (resultSet.next()) {
            	Tuple2<String,Integer> obj = new Tuple2<String,Integer>();
            	obj.setField(resultSet.getString(COLUMNS[0]).trim(), 0);
            	obj.setField(resultSet.getInt(COLUMNS[1]), 1);
            	sourceContext.collect( obj );
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cancel() {

    }

}
